package com.ztesoft.zsmart.zcm.metrics.utils;

import com.ztesoft.zsmart.core.log.ZSmartLogger;
import com.ztesoft.zsmart.zcm.metrics.domain.dto.MetricsStatsDto;
import com.ztesoft.zsmart.zcm.metrics.service.stats.QueueStatsService;
import com.ztesoft.zsmart.zcm.metrics.service.stats.TomcatStatsService;
import com.ztesoft.zsmart.zcm.metrics.service.storage.InfluxDbStorage;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public final class SelfMonitorCache implements Runnable {

    private static ConcurrentHashMap<String, AtomicLong> influxdbTableCountCache = new ConcurrentHashMap<>();

    private static final ZSmartLogger logger = ZSmartLogger.getLogger(SelfMonitorCache.class);

    private static AtomicLong collectdNum = new AtomicLong();

    private static AtomicLong agentNum = new AtomicLong();

    private static AtomicLong influxdbWriteLineNum = new AtomicLong();

    private static AtomicLong influxdbWritePointNum = new AtomicLong();

    private static AtomicLong kafkaConnectorNum = new AtomicLong();

    private static AtomicLong influxdbConnectorNum = new AtomicLong();

    public static AtomicLong kapacitorConnectorNum = new AtomicLong(0L);

    private static AtomicLong prometheusNum = new AtomicLong();

    private static AtomicLong collectdNumPeriod = new AtomicLong();

    private static AtomicLong agentNumPeriod = new AtomicLong();

    private static AtomicLong influxdbWriteLineNumPeriod = new AtomicLong();

    private static AtomicLong influxdbWritePointNumPeriod = new AtomicLong();

    private static AtomicLong kafkaConnectorNumPeriod = new AtomicLong();

    private static AtomicLong influxdbConnectorNumPeriod = new AtomicLong();

    public static AtomicLong kapacitorConnectorNumPeriod = new AtomicLong(0L);

    private static AtomicLong prometheusNumPeriod = new AtomicLong();

    private static AtomicLong prometheusTimeseriesListNumTotal = new AtomicLong(0L);

    private static AtomicLong prometheusTimeseriesListNumMax = new AtomicLong(0L);

    /**
     * 数据转换的耗时
     */
    public static AtomicLong translatorDuration = new AtomicLong(0L);

    public static AtomicLong writeInfluxdbDuration = new AtomicLong(0L);

    public static AtomicLong writeKapacitorDuration = new AtomicLong(0L);

    public static AtomicLong routerInfluxdbDuration = new AtomicLong(0L);

    public static AtomicLong rawWriteInfluxdbDuration = new AtomicLong(0L);

    public static AtomicLong discardMetrics = new AtomicLong(0L);

    public static AtomicLong discardPrometheus = new AtomicLong(0L);

    public static AtomicLong discardKafkaMetrics = new AtomicLong(0L);

    public static AtomicLong discardKapacitorMetrics = new AtomicLong(0L);

    public static AtomicLong discardClickhouseMetrics = new AtomicLong(0L);

    public static AtomicLong setRedisDuration = new AtomicLong(0L);

    public static AtomicLong rpushRedisDuration = new AtomicLong(0L);

    public static AtomicLong delRedisDuration = new AtomicLong(0L);

    public static AtomicLong existsRedisDuration = new AtomicLong(0L);

    public static AtomicLong rpopRedisDuration = new AtomicLong(0L);

    public static AtomicLong getRedisDuration = new AtomicLong(0L);

    public static AtomicLong collectdPipelineDuration = new AtomicLong(0L);

    public static AtomicLong prometheusPipelineDuration = new AtomicLong(0L);

    public static AtomicLong prometheusPipelineReadDuration = new AtomicLong(0L);

    public static AtomicLong collectdPipelineReadDuration = new AtomicLong(0L);

    public static AtomicLong expiredMetricsNum = new AtomicLong(0L);

    public static AtomicLong expiredKapacitorNum = new AtomicLong(0L);

    public static AtomicLong failedWriteMetricsNum = new AtomicLong(0L);

    public static AtomicLong failedWriteKapacitorNum = new AtomicLong(0L);

    public static AtomicLong prometheusNaNvalNum = new AtomicLong(0L);

    public static AtomicLong routerKapacitorDuration = new AtomicLong(0L);

    public static AtomicLong rawWriteKapacitorDuration = new AtomicLong(0L);

    private static double collectdRate = -1.0;

    private static double agentRate = -1.0;

    private static double influxdbWriteLineRate = -1.0;

    private static double influxdbWritePointRate = -1.0;

    private static double kafkaConnectorRate = -1.0;

    private static double influxdbConnectorRate = -1.0;

    private static double kapacitorConnectorRate = -1.0;

    private static double prometheusRate = -1.0;

    public static void addPrometheusTimeseriesListNumTotal(long num) {
        prometheusTimeseriesListNumTotal.getAndAdd(num);
        setPrometheusTimeseriesListNumMax(num);
    }

    public static void setPrometheusTimeseriesListNumMax(long num) {
        long max = prometheusTimeseriesListNumMax.get();
        if (num > max) {
            prometheusTimeseriesListNumMax.set(num);
        }
    }

    public static void addCollectdNum() {
        collectdNum.incrementAndGet();
        collectdNumPeriod.incrementAndGet();
    }

    public static void addAgentNum() {
        agentNum.incrementAndGet();
        agentNumPeriod.incrementAndGet();
    }

    public static void addInfluxdbWriteLineNum() {
        influxdbWriteLineNum.incrementAndGet();
        influxdbWriteLineNumPeriod.incrementAndGet();
    }

    public static void addInfluxdbWritePointNum() {
        influxdbWritePointNum.incrementAndGet();
        influxdbWritePointNumPeriod.incrementAndGet();
    }

    public static void addKafkaConnectorNum() {
        kafkaConnectorNum.incrementAndGet();
        kafkaConnectorNumPeriod.incrementAndGet();
    }

    public static void addInfluxdbConnectorNum() {
        influxdbConnectorNum.incrementAndGet();
        influxdbConnectorNumPeriod.incrementAndGet();
    }

    public static void addKapacitorConnectorNum() {
        kapacitorConnectorNum.incrementAndGet();
        kapacitorConnectorNumPeriod.incrementAndGet();
    }

    public static void addPrometheusNum() {
        prometheusNum.incrementAndGet();
        prometheusNumPeriod.incrementAndGet();
    }

    /**
     * 计算十分钟内的上报速率
     */
    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10 * 60 * 1000);
                collectdRate = new BigDecimal(collectdNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                agentRate = new BigDecimal(agentNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                influxdbWriteLineRate = new BigDecimal(influxdbWriteLineNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                influxdbWritePointRate = new BigDecimal(influxdbWritePointNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                kafkaConnectorRate = new BigDecimal(kafkaConnectorNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                influxdbConnectorRate = new BigDecimal(influxdbConnectorNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                kapacitorConnectorRate = new BigDecimal(kapacitorConnectorNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
                prometheusRate = new BigDecimal(prometheusNumPeriod.getAndSet(0) + "")
                    .divide(new BigDecimal("600"), 2, BigDecimal.ROUND_HALF_UP).doubleValue();
            }
            catch (InterruptedException e) {
                logger.error("calculate kpi rate error:", e);
            }

        }
    }

    /**
     * 上报metrics指标用fields字段/状态统计字段
     *
     * @return
     */
    private static Map<String, Object> getReportInfo() {
        Map<String, Object> info = new LinkedHashMap<>();
        info.put("collectdNum", collectdNum.get());
        info.put("agentNum", agentNum.get());
        info.put("influxdbWriteLineNum", influxdbWriteLineNum.get());
        info.put("influxdbWritePointNum", influxdbWritePointNum.get());
        info.put("kafkaConnectorNum", kafkaConnectorNum.get());
        info.put("influxdbConnectorNum", influxdbConnectorNum.get());
        info.put("kapacitorConnectorNum", kapacitorConnectorNum);
        info.put("prometheusNum", prometheusNum.get());
        info.put("collectdRate", collectdRate);
        info.put("agentRate", agentRate);
        info.put("influxdbWriteLineRate", influxdbWriteLineRate);
        info.put("influxdbWritePointRate", influxdbWritePointRate);
        info.put("kafkaConnectorRate", kafkaConnectorRate);
        info.put("influxdbConnectorRate", influxdbConnectorRate);
        info.put("kapacitorConnectorRate", kapacitorConnectorRate);
        info.put("prometheusRate", prometheusRate);
        info.put("translatorDuration", translatorDuration);
        info.put("writeInfluxdbDuration", writeInfluxdbDuration);
        info.put("writeKapacitorDuration", writeKapacitorDuration);
        info.put("routerInfluxdbDuration", routerInfluxdbDuration);
        info.put("rawWriteInfluxdbDuration", rawWriteInfluxdbDuration);
        info.put("discardMetrics", discardMetrics);
        info.put("discardPrometheus", discardPrometheus);
        info.put("discardKafkaMetrics", discardKafkaMetrics);
        info.put("discardKapacitorMetrics", discardKapacitorMetrics);
        info.put("discardClickhouseMetrics", discardClickhouseMetrics);
        info.put("setRedisDuration", setRedisDuration);
        info.put("rpushRedisDuration", rpushRedisDuration);
        info.put("delRedisDuration", delRedisDuration);
        info.put("existsRedisDuration", existsRedisDuration);
        info.put("rpopRedisDuration", rpopRedisDuration);
        info.put("getRedisDuration", getRedisDuration);
        info.put("collectdPipelineDuration", collectdPipelineDuration);
        info.put("collectdPipelineReadDuration", collectdPipelineReadDuration);
        info.put("prometheusPipelineDuration", prometheusPipelineDuration);
        info.put("prometheusTimeseriesListNumMax", prometheusTimeseriesListNumMax);
        info.put("prometheusTimeseriesListNumTotal", prometheusTimeseriesListNumTotal);
        info.put("expiredMetricsNum", expiredMetricsNum);
        info.put("expiredKapacitorNum", expiredKapacitorNum);
        info.put("failedWriteMetricsNum", failedWriteMetricsNum);
        info.put("failedWriteKapacitorNum", failedWriteKapacitorNum);
        info.put("prometheusNaNvalNum", prometheusNaNvalNum);
        info.put("routerKapacitorDuration", routerKapacitorDuration);
        info.put("rawWriteKapacitorDuration", rawWriteKapacitorDuration);
        info.put("prometheusPipelineReadDuration", prometheusPipelineReadDuration);
        return info;
    }

    /**
     * 获取自动上报指标fields字段
     *
     * @return
     */
    public static Map<String, Object> getCollectInfo() {
        QueueStatsService queueStatsService = SpringContextUtil.getBean(QueueStatsService.class);
        InfluxDbStorage influxDbStorage = SpringContextUtil.getBean(InfluxDbStorage.class);
        Map<String, Object> map = new LinkedHashMap<>();
        // 查询告警计数明细
        map.put("enqueueRate", queueStatsService.enqueueRate());
        Map<String, Long> dequeueRates = queueStatsService.dequeueRates();
        Iterator<Map.Entry<String, Long>> dequeueRatesIterator = dequeueRates.entrySet().iterator();
        while (dequeueRatesIterator.hasNext()) {
            Map.Entry<String, Long> dequeueRatesNext = dequeueRatesIterator.next();
            map.put("dequeueRates" + dequeueRatesNext.getKey(), dequeueRatesNext.getValue());
        }
        Map<String, Long> totalDequeues = queueStatsService.totalDequeues();
        Iterator<Map.Entry<String, Long>> totalDequeuesIterator = totalDequeues.entrySet().iterator();
        while (totalDequeuesIterator.hasNext()) {
            Map.Entry<String, Long> totalDequeuesNext = totalDequeuesIterator.next();
            map.put("totalDequeues" + totalDequeuesNext.getKey(), totalDequeuesNext.getValue());
        }
        map.put("totalEnqueue", queueStatsService.totalEnqueue());
        map.put("failedDequeue", queueStatsService.failedEnqueue());
        Long unconsumedNum = new BigDecimal(queueStatsService.totalEnqueue())
            .subtract(new BigDecimal(String.valueOf(SelfMonitorCache.getReportInfo().get("influxdbConnectorNum"))))
            .subtract(new BigDecimal(String.valueOf(SelfMonitorCache.getReportInfo().get("expiredMetricsNum"))))
            .longValue();
        map.put("unconsumedNum", unconsumedNum);
        map.put("influxdbMetricsQueueSize", influxDbStorage.getQueueSize());
        map.putAll(SelfMonitorCache.getReportInfo());
        return Collections.unmodifiableMap(map);
    }

    /**
     * metrics状态统计接口用
     *
     * @return
     */
    public static MetricsStatsDto getMetricsStatsDto() {
        QueueStatsService queueStatsService = SpringContextUtil.getBean(QueueStatsService.class);
        InfluxDbStorage influxDbStorage = SpringContextUtil.getBean(InfluxDbStorage.class);
        TomcatStatsService tomcatStatsService = SpringContextUtil.getBean(TomcatStatsService.class);
        MetricsStatsDto metricsStatsDto = new MetricsStatsDto();
        metricsStatsDto.setEnqueueRate(queueStatsService.enqueueRate());
        metricsStatsDto.setDequeueRates(queueStatsService.dequeueRates());
        metricsStatsDto.setTotalEnqueue(queueStatsService.totalEnqueue());
        metricsStatsDto.setTotalDequeues(queueStatsService.totalDequeues());
        metricsStatsDto.setFailedDequeue(queueStatsService.failedEnqueue());
        metricsStatsDto.setTomcat(tomcatStatsService.stat());
        Map<String, Object> info = SelfMonitorCache.getReportInfo();
        info.put("tableCounter", getTableCountCache());
        metricsStatsDto.setReportKpi(Collections.unmodifiableMap(info));
        metricsStatsDto.setInfluxdbMetricsQueueSize(influxDbStorage.getQueueSize());
        Long unconsumedNum = new BigDecimal(queueStatsService.totalEnqueue())
            .subtract(new BigDecimal(String.valueOf(info.get("influxdbConnectorNum"))))
            .subtract(new BigDecimal(String.valueOf(info.get("expiredMetricsNum"))))
            .longValue();
        metricsStatsDto.setUnconsumedNum(unconsumedNum);
        return metricsStatsDto;
    }

    public static void countTable(String name, Long size) {
        if (influxdbTableCountCache.containsKey(name)) {
            influxdbTableCountCache.get(name).getAndAdd(size);
        }
        else {
            influxdbTableCountCache.put(name, new AtomicLong(size));
        }
    }

    public static Map<String, AtomicLong> getTableCountCache() {
        return Collections.unmodifiableMap(influxdbTableCountCache);
    }

}
